[dbt] 「incremental」というMaterializationを使ってデータモデルを増分更新する
大阪オフィスの玉井です。
以前、dbtのデータモデルの「Materialization」という設定について検証しました。
上記で紹介していないMaterializationの種類に「incremental」というものがあります。これは、データモデルとしては、普通にテーブルとして生成するのですが、初回生成以降については、データの増分更新ができるようになります。
検証がちょっと面倒だったので、今回、別記事に切り出した形で、実際に検証してみたいと思います。
検証環境
- macOS Catalina 10.15.7
- dbt CLI 0.18.1
- Google BigQuery
やってみた
前準備
下記の環境を準備しておきます(手軽にデータを増やせるので)。ただし、今回使用するDWHはBigQueryです。
ロードしたデータは下記の通り。これが今回の検証におけるローデータとなります。この時点で1560件あります(後で増やします)。
「incremental」モデルを作成する
モデルファイル
準備したローデータを参照して、「incremental」なデータモデルを作成します。クエリは下記の通り。
{{ config( materialized='incremental' ) }} SELECT _fivetran_synced ,_modified ,change ,price ,sector ,ticker_symbol FROM `tamai-rei.kinesis.dev_stream_data` {% if is_incremental() %} where _fivetran_synced > (select max(_fivetran_synced) from {{ this }}) {% endif %}
config
今回検証するMaterializationはincrementalなので、それをconfig
で指定します。
is_incremental
ここがincrementalモデルのミソになります。incrementalモデルは増分更新なので、「どこからどこまでを増分するのか」を判断する基準となるロジックを組み込む必要があります。
要するに、「増分したレコードだけを特定する」必要があるわけです。方法はデータモデルによって千差万別ですが、今回は日付を使います。いまデータモデルに存在するレコードより新しいレコードだけを増分更新の対象にします。
まず、増分の基準には、WHERE句を使います。今回、データのロードにはFivetranを使っているため、Fivetranでロードした日付として_fivetran_synced
というカラムがあります。現在のデータモデルのカラムの最新日付をとり、それよりも新しい日付のレコードだけをデータモデルに対して増分更新するという仕組みです。
そして、このWHERE句は、is_incremental
という条件式がTrueの時だけ動くようになっています。is_incremental
は、下記の条件を満たした時にTrueとなります。
- Materializationが
incremental
である - データモデルがDWHに既に存在する(= 初回作成ではない)
--full-refresh
のオプションが無い状態で実行されている
ざっくりいうと、データモデルの更新時のときだけ動作するようになる、という感じです。
モデルの実行
上記のデータモデルを初めて実行します。
$ dbt run --models stg_stream_data Running with dbt=0.18.1 Found 6 models, 11 tests, 0 snapshots, 2 analyses, 326 macros, 0 operations, 1 seed file, 2 sources 15:40:56 | Concurrency: 4 threads (target='kinesis') 15:40:56 | 15:40:56 | 1 of 1 START incremental model kinesis.stg_stream_data............... [RUN] 15:40:59 | 1 of 1 OK created incremental model kinesis.stg_stream_data.......... [CREATE TABLE (1.6k rows, 71.9 KB processed) in 3.01s] 15:40:59 | 15:40:59 | Finished running 1 incremental model in 5.30s. Completed successfully Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
データモデルがテーブルとして生成されました。
実際に実行されたクエリですが、WHERE句のないSELECT文が、そのままCREATE TABLEされています。これは初回実行なので、is_incremental
がFalseとなったためです。
データを増やして、再度モデルを実行する
ローデータ側を増やしてみて、再度モデルを実行してみます。incremental
なので、データモデルは再構築ではなく、増分更新となるはずです。
Firehose側でデータを増やします。
Fivetranで同期し、ローデータ側が1560→2600に増えたことを確認します。
データモデルを再度実行します。CREATEではなくMERGEとなっていますね。
$ dbt run --models stg_stream_data Running with dbt=0.18.1 Found 6 models, 11 tests, 0 snapshots, 2 analyses, 326 macros, 0 operations, 1 seed file, 2 sources 16:22:50 | Concurrency: 4 threads (target='kinesis') 16:22:50 | 16:22:50 | 1 of 1 START incremental model kinesis.stg_stream_data............... [RUN] 16:22:53 | 1 of 1 OK created incremental model kinesis.stg_stream_data.......... [MERGE (1.0k rows, 132.1 KB processed) in 2.79s] 16:22:53 | 16:22:53 | Finished running 1 incremental model in 5.64s. Completed successfully Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
では、DWH側のクエリ履歴を見てみましょう。
WHERE句内の条件式が動作し、増分したレコードだけマージされていますね。検証としては成功です。
つかいみち
「incremental」が向いているのは、下記のようなデータだと思いました(公式ドキュメントにも書いてあるけど…)。
- データ量が純粋に積み上がっていく
- 原則レコードの削除が無い
- データ量がかなり大きい
- 億単位〜
- データ構造の変更が原則無い
- カラムの変更など
アプリケーションのイベントデータなどが該当するのではないでしょうか。膨大に積み上がっていくデータの場合、毎度データモデルを再構築していては、DWHに多大な負荷がかかります。SnowflakeやBigQueryの場合は、コストも馬鹿にならなくなってきます。そういう時、モデルのMaterializationを「incremental」をすることで、増えた分だけをクエリの対象にし、DWHの負荷やコストを抑えることができます。
逆に、マスタデータのような、カラム等の仕様が変わる(ことが予想される)データには、「incremental」は向いていないでしょうね。
また、「incremental」は、増分を特定するために、モデル内に少々ロジックを組む必要があるため、データモデルのクエリの可読性は下がります。ここらへんはトレードオフなので、じっくり検討しましょう。
おわりに
「Materialization」についても、データ側の知見(どういう仕様で更新されていくのか等)が分かってないと、適切な選択をするのは難しいと思います。